<?php
namespace app\modules\rabbitmq\server;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use app\modules\rabbitmq\api\CliConsumerApi;

class RabbitObserver implements RabbitAdapterServer
{

    /**消费者
     * @param $queueName
     */
    public function rabbitConsumer($queueName)
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->queue_declare($queueName, false, true, false, false);

        $callback = function ($msg) {
            $result = $msg->body;
            $resultData = json_decode($result,true);

            // 提供中转服务，推送至服务
            CliConsumerApi::RabbitmaTransfer($resultData['consumerclass'],$resultData['method'],$resultData['data']);

            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    /**
     * 生产者
     * @param $queue
     * @param $exchange
     * @param $argv
     */
    public function rabbitProducer($queueName,$exchange,$argv)
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
        $channel = $connection->channel();


        $channel->queue_declare($queueName, false, true, false, false);

        $channel->exchange_declare($exchange, 'direct', false, true, false);

        $channel->queue_bind($queueName, $exchange);

        $argv = json_encode($argv);
        $argv = ['',$argv];

        $messageBody = implode(' ', array_slice($argv, 1));
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message, $exchange);

        $channel->close();
        $connection->close();
    }
}